/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.netease.arctic.trino.iceberg;

import static com.google.common.base.Preconditions.checkState;
import static io.airlift.testing.Closeables.closeAllSuppress;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.util.Objects.requireNonNull;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.DistributedQueryRunner;
import io.trino.tpch.TpchTable;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public final class ArcticQueryRunner {
  private static final Logger log = Logger.get(ArcticQueryRunner.class);

  public static final String ARCTIC_CATALOG = "arctic";

  private ArcticQueryRunner() {}

  public static DistributedQueryRunner createIcebergQueryRunner(TpchTable<?>... tables)
      throws Exception {
    return builder().setInitialTables(tables).build();
  }

  public static Builder builder() {
    return new Builder();
  }

  public static class Builder extends DistributedQueryRunner.Builder<Builder> {
    private Optional<File> metastoreDirectory = Optional.empty();
    private ImmutableMap.Builder<String, String> icebergProperties = ImmutableMap.builder();
    private Optional<SchemaInitializer> schemaInitializer = Optional.empty();

    protected Builder() {
      super(testSessionBuilder().setCatalog(ARCTIC_CATALOG).setSchema("tpch").build());
    }

    public Builder setMetastoreDirectory(File metastoreDirectory) {
      this.metastoreDirectory = Optional.of(metastoreDirectory);
      return self();
    }

    public Builder setIcebergProperties(Map<String, String> icebergProperties) {
      this.icebergProperties =
          ImmutableMap.<String, String>builder()
              .putAll(requireNonNull(icebergProperties, "icebergProperties is null"));
      return self();
    }

    public Builder addIcebergProperty(String key, String value) {
      this.icebergProperties.put(key, value);
      return self();
    }

    public Builder setInitialTables(TpchTable<?>... initialTables) {
      return setInitialTables(ImmutableList.copyOf(initialTables));
    }

    public Builder setInitialTables(Iterable<TpchTable<?>> initialTables) {
      setSchemaInitializer(SchemaInitializer.builder().withClonedTpchTables(initialTables).build());
      return self();
    }

    public Builder setSchemaInitializer(SchemaInitializer schemaInitializer) {
      checkState(this.schemaInitializer.isEmpty(), "schemaInitializer is already set");
      this.schemaInitializer =
          Optional.of(requireNonNull(schemaInitializer, "schemaInitializer is null"));
      amendSession(sessionBuilder -> sessionBuilder.setSchema(schemaInitializer.getSchemaName()));
      return self();
    }

    @Override
    public DistributedQueryRunner build() throws Exception {
      DistributedQueryRunner queryRunner = super.build();
      try {
        queryRunner.installPlugin(new TpchPlugin());
        queryRunner.createCatalog("tpch", "tpch");

        queryRunner.installPlugin(new TestArcticPlugin());
        Map<String, String> icebergProperties =
            new HashMap<>(this.icebergProperties.buildOrThrow());
        icebergProperties.put(
            "arctic.url",
            queryRunner
                .getCoordinator()
                .getBaseDataDir()
                .resolve("arctic")
                .toAbsolutePath()
                .toString());
        queryRunner.createCatalog(ARCTIC_CATALOG, "arctic", icebergProperties);
        schemaInitializer.orElse(SchemaInitializer.builder().build()).accept(queryRunner);

        return queryRunner;
      } catch (Exception e) {
        closeAllSuppress(e, queryRunner);
        throw e;
      }
    }
  }

  public static void main(String[] args) throws Exception {
    DistributedQueryRunner queryRunner = null;
    queryRunner =
        ArcticQueryRunner.builder()
            .setExtraProperties(ImmutableMap.of("http-server.http.port", "8080"))
            .setInitialTables(TpchTable.getTables())
            .build();
    Thread.sleep(10);
    Logger log = Logger.get(ArcticQueryRunner.class);
    log.info("======== SERVER STARTED ========");
    log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
  }
}
